嗨!今天要說明在Flask使用MQTT通訊協定做資料傳輸!
以前我都是使用nodejs實現,用Flask是因為我的小夥伴忙著解其他Bug所以我就幫他試了,順便介紹一下MQTT的基本概念。
nodejs怎麼實現MQTT:MQTT是一種 machine-to-machine(M2M)的輕量級通訊協定,可以讓各種設備互相溝通,所需要的運算與傳輸頻寬很低。
MQTT的傳送方式是藉由MQTT broker做分發,如下圖:
Sensor端接收到溫度之後,透過Publish將收到的資料發送,讓Subscribe端接收,一次可能會有好多組Publish與Subscribe,就是透過中間的broker端去根據不同的主題(Topic)做資料派送。
理解MQTT的概念之後就可以來安裝broker了,這邊我們安裝一個叫做Mosquitto的伺服器。
Mac只需要用brew安裝就好:
$ brew install mosquitto
完成之後,打開伺服器服務:
$ brew services start mosquitto
$ sudo apt-get install mosquitto mosquitto-clients
好了之後來測試一下Mosquitto,打開兩個終端機:
mosquitto_sub
$ mosquitto_sub -h localhost -t test
mosquitto_pub
$ mosquitto_pub -h localhost -t test -m "Hello world!"
發送之後就可以看到訂閱端的終端機收到Hello world!了
-h: host 沒有特別打的話預設是localhost。-t: Topic 主題,訂閱跟發送端需要同一個主題,上面我們設定的主題都是test。-m: 發送端多了-m,後面打上要發送的內容。介紹MQTT之後,來試著在Flask中加上MQTT的發送與接收吧,Flask的詳細這邊就不多做解釋了,在Flask中有個叫做flask_mqtt的套件,可以輕鬆的使用MQTT功能。
pip安裝flask_mqtt:$ pip insatll flask_mqtt
from flask import Flask
from flask_mqtt import Mqtt
app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = 'localhost'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_REFRESH_TIME'] = 1.0
mqtt = Mqtt(app)
@app.route('/')
def index():
return 'hello world'
mqtt.subscribe('myTopic'),要讓啟動(前面的設定)之後正確處理我們的訂閱,我們需要將訂閱方法放在一個on_connect(),callback function內:@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
mqtt.subscribe('mytopic')
mqtt.publish('mytopic', 'any message you want to pubish')
on_message(),來讀取收到的內容:@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
topic=message.topic,
payload=message.payload.decode()
message.topic : 收到的主題。message.payload.decode(): 該主題的內容。最後來看一下範例吧,在這之前我們先基本設定:
from flask import Flask
from flask_mqtt import Mqtt
from flask import render_template
app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = '127.0.0.1'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_REFRESH_TIME'] = 1.0
mqtt = Mqtt(app)
if __name__ == '__main__':
app.run(debug=True)
route (這裡其實可以省略)@app.route('/')
def index():
return 'Index Hello World'
<want_to_pub>接收變數,function內會先判斷是否有內容。有的話mqtt會將want_to_pub的內容當作msg發送給訂閱mytopic主題的訂閱端。@app.route('/api/v1.0/mqtt/pub/<want_to_pub>', methods=['GET'])
def pub_my_msg(want_to_pub):
if len(want_to_pub) == 0:
abort(404)
mqtt.publish('mytopic',want_to_pub )
return want_to_pub
好了之後來測試是否成功,到 localhost:5000 (此為Flask預設),看是否有看到頁面上有Index Hello World字樣。此處成功之後到localhost:5000/api/v1.0/mqtt/pub/會看到Not Found字樣,這是正常的,因為我們後面沒有加上任何東西。
$ mosquitto_sub -t mytopic
localhost:5000/api/v1.0/mqtt/pub/後面加上任何想發送的訊息mymsg:localhost:5000/api/v1.0/mqtt/pub/mymsg
mymsg

這樣就成功了!
接下來要來測試訂閱功能了,這個範例主要是終端機發佈Json訊息,讓Flask訂閱。
為了解析Json內容,我們要先在程式碼內新增:
import json
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
mqtt.subscribe('mytopic')
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
payload = message.payload.decode()
p = json.loads(payload)
print("-------msg-------")
print('name :', p['name'])
print('email :', p['email'])
好了之後就可以執行了。
mosquitto_pub -t mytopic -m '{"name" : "Jack mqtt", "email":"jack@mqtt.example.com" }'

Flask-MQTT使用方法可以到:謝謝大家!掰掰~